部署KapokMQ无需安装任何外部依赖应用/环境,单文件应用,且配置简单,启动方便。
支持多种消息推送方式(订阅/发布推送、点对点推送、延时消息发布)。
消息队列与客户端采用WebSocket连接,全双工通讯,消息与ACK发送都在一条连接上,简易高效。
集群基于Gossip构建,可自动探测集群节点,能轻易做到水平扩展、断线重连。
具有主从节点消息同步功能,主节点宕机后,从节点可以自动接手消息推送工作。
提供全量数据持久化方案及WAL预写日志记录,用户可在高性能与高可靠之间自由选择。
内置网页端控制台,可直接通过网页查看消息队列的运行情况。
2019年的轻薄本,未充电状态,平衡模式
4核8线程处理器:Intel(R) Core(TM) i5-8265U CPU @ 1.60GHz 1.80 GHz
LPDDR3内存:8.00 GB (7.85 GB 可用)
固态硬盘:WDC PC SN720 SDAPNTW-512G
Windows系统下直接运行./build.bat文件
自动执行打包命令
生成Linux二进制文件与Windows exe文件
GoLand终端cd到项目根目录,执行go build命令,生成exe文件
cmd终端cd到项目根目录,依次执行下列命令:
SET CGO_ENABLED=0
SET GOOS=linux
SET GOARCH=amd64
go build
生成二进制执行文件
在Windows上部署
/kapokmq # 文件根目录
kapokmq.exe # 打包后的exe文件
application.yaml # 配置文件
/log # 日志目录
/view # 前端-Vue项目打包文件
MQDATA # 持久化文件
WAL.log # 预写日志
在Linux上部署
/kapokmq # 文件根目录
kapokmq # 打包后的二进制文件(后台运行指令:setsid ./kapokmq)
application.yaml # 配置文件
/log # 日志目录
/view # 前端-Vue项目打包文件
MQDATA # 持久化文件
WAL.log # 预写日志
http://localhost:port/#/Console
MessageCode string #消息唯一标识码(由消息队列生成)
MessageData string #消息内容(一般为JSON格式的字符串)
Topic string #消息所属主题
CreateTime int64 #消息创建时间(秒级时间戳)
ConsumedTime int64 #消息被消费时间(秒级时间戳)
DelayTime int64 #延迟推送时间(单位:秒)
Status int #消息状态(-1:待消费。0:未到推送时间的延时消息。1:已消费)
ws://localhost:port/Producers/Conn/{topic}/{producerId}
WebSocket链接中的参数:
topic //主题名称
ProducerId //生产者客户端Id
消息队列接收的消息格式
生产者客户端推送给消息队列的Json字符串消息格式
{
"MessageData":"hello",
"DelayTime":0
}
常规消息进入消息通道之前,状态将被设为待消费(Status:-1)。
延时消息进入消息通道之前,状态将被设为未到时(Status:0)。
消息队列接收到该消息后(写入日志后),通过该websocket连接向生产者客户端发送ACK,ACK内容为字符串"ok"
如果生产者客户端选择异步发送消息方式,则可忽略该ACK。
如果要追求消息的可靠性,可以利用该ACK机制发送同步消息,即生产者在发送完一条消息后,等待消息队列发来的ACK后,再继续发送下一条消息。
消息队列发送给生产者的ACK字符串样式
"ok"
ws://localhost:port/Consumers/Conn/{topic}/{consumerId}
WebSocket链接中的参数:
topic //主题名称
consumerId //消费者客户端Id
通过WriteJSON()函数将model.Message类型的消息转为Json字符串发送
消息队列推送给消费者客户端的Json字符串消息格式
{
"MessageCode":"8c01b728ef82ba754a63e61daa43e83c61b744c7",
"MessageData":"hello",
"Topic":"test_topic",
"CreateTime":1640975470,
"ConsumedTime":1640975520,
"DelayTime":0,
"Status":-1
}
消费者客户端接收并处理完该消息后,通过该websocket连接向消息队列异步发送ACK,ACK内容为消息的唯一标识码MessageCode
消息队列接收到ACK后,将指定消息的状态更改为已消费(Status:1)。
如果消息到达超时时间(mq.pushRetryTime,默认为300秒)仍未收到ACK,将进行重推。
消费者客户端发送给消息队列的ACK字符串样式
"8c01b728ef82ba754a63e61daa43e83c61b744c7"
生产者、消费者客户端与消息队列建立连接后,需输入密钥登录
ws://127.0.0.1:8011/Producers/Conn/test_topic/1
服务端回应 2022-01-02 15:14:53
"Please enter the secret key" //提示输入密钥
客户端发送 2022-01-02 15:15:06
"qqq" //输入错误的密钥
服务端回应 2022-01-02 15:15:06
"Secret key matching error" //提示密钥出错
服务端回应 2022-01-02 15:15:06
"Please enter the secret key" //再次提示输入密钥
客户端发送 2022-01-02 15:15:13
"test" //输出正确的密钥
服务端回应 2022-01-02 15:15:13
"Secret key matching succeeded" //提示密钥验证成功
客户端发送 2022-01-02 15:15:15 //生产者客户端可以向消息队列发送消息
"{.. Json SendMessage ..}"
服务端回应 2022-01-02 15:15:15 //消息队列接收到消息后,向生产者发送ACK
"ok" //ACK内容为字符串"ok"
ws://127.0.0.1:8011/Consumers/Conn/test_topic/1
服务端回应 2022-01-02 15:14:53
"Please enter the secret key" //提示输入密钥
客户端发送 2022-01-02 15:15:06
"qqq" //输入错误的密钥
服务端回应 2022-01-02 15:15:06
"Secret key matching error" //提示密钥出错
服务端回应 2022-01-02 15:15:06
"Please enter the secret key" //再次提示输入密钥
客户端发送 2022-01-02 15:15:13
"test" //输出正确的密钥
服务端回应 2022-01-02 15:15:13
"Secret key matching succeeded" //提示密钥验证成功
服务端回应 2022-01-02 15:15:13 //消息队列可以向消费者客户端发送消息
"{.. Json Message ..}"
"{.. Json Message ..}"
客户端发送 2022-01-02 15:15:14 //消费者接收到消息后,向消息队列发送ACK
"8c01b728ef82ba754a63e61daa43e83c61b744c7" //ACK内容为MessageCode
"sdiw2b7quh82basdsa17sdqdqw81d83c61bqdhhu" //可异步发送确认消费ACK
memory/mq.go
使用golang的通道chan充当队列,所有主题的消息都将进入该通道,通道的缓冲空间大小决定了消息队列的吞吐量。
使用sync.Map存储所有消息,用于数据持久化、消息检查、控制台数据获取。
//消息通道,用于存放待消费的消息(有缓冲区)
var messageChan = make(chan models.Message, messageChanBuffer)
// MessageList 消息列表,存放所有消息记录
var MessageList sync.Map
server/producer.go
生产者客户端通过WebSocket连接到消息队列(github.com/gorilla/websocket),并发送消息到消息队列,消息被写入消息通道与消息列表。
额外提供生产者HTTP接口,可通过HTTP请求向消息队列发送消息。
接口URL:
请求方式
POST
Content-Type
form-data
请求Header参数
参数 | 示例值 | 是否必填 | 参数描述 |
---|---|---|---|
secretKey | test | 必填 | 访问密钥 |
请求Body参数
参数 | 示例值 | 是否必填 | 参数描述 |
---|---|---|---|
messageData | hello | 必填 | 消息主体内容 |
topic | test_topic | 必填 | 消息所属主题 |
delayTime | 0 | 必填 | 延时投送时间 |
成功响应示例
{
"code":0,
"msg":"cbcebdfc446e237af323098fd125c5b161b7516c"
}
server/consumer.go
消费者客户端通过WebSocket连接到消息队列(github.com/gorilla/websocket)。
包含订阅/发布、点对点两种推送模式。
消费者客户端接收消息后,将向消息队列发送一条ACK确认字符(内容为消息标识码messageCode),消息队列再根据此ACK将指定messageCode的消息更改为已消费状态。
persistent
全量数据写入:定期将MessageList消息列表中的未消费消息和延时消息转换为[]byte类型数据,并写入二进制文件,类似于Redis RDB持久化方式。
全量数据写入结合WAL日志:定期将内存中的消息全量持久化到二进制文件(不包括已被消费的消息),在两次全量数据持久化之间,每次接收或更新消息操作都将写入WAL日志,最大程度避免消息丢失。
数据恢复:从二进制文件及WAL日志中读取数据,并将数据恢复至MessageList消息列表中,重新推送未消费的消息。
server/check.go
syncConn
主从节点之间通过websocket连接同步消息。需先启动主节点,再启动从节点。
主从节点连接后,从节点会定期对主节点进行心跳探测,如果检测到主节点宕机,从节点会自动开始推送消息给备用消费者客户端,当主节点重连后,从节点将重新关闭推送功能。
cluster/join.go
使用 github.com/hashicorp/memberlist 构建并链接Gossip集群服务。
借助Gossip协议扩散同步的特性,可以随时向集群中添加新的消息队列节点。
console/api.go
加入指定集群
项目配置文件加载
控制台接口
消息通道与消息列表
cors.go 跨域配置
safe.go 安全验证
client.go 客户端模板
message.go 消息模板
node.go 集群节点模板
常规日志与WAL日志写入
fileRW.go 文件读写
persData.go 持久化到硬盘
recovery.go 数据恢复
路由配置
producer.go 生产者消息接收
consumer.go 消费者消息推送
check.go 消息检查-消息重推与过期消息清理
master.go 主节点向从节点发送消息
slave.go 从节点接收消息
sync.go 主从同步初始化
createCode.go 消息标识码生成
localTime.go 获取本地时间
md5Sign.go md5加密
toTimestamp.go 日期字符串转时间戳
css
js
index.html
实现功能 | 功能说明 | 当前进度 |
---|---|---|
Java客户端 | Maven包,websocket连接,Demo:https://gitee.com/dpwgc/kapokmq-java-client | 未完成 |
拉模式消费 | 消费者主动拉取消息队列的消息 | 计划中 |
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。